package com.atguigu.flink.sql.connector;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Created by Smexy on 2023/4/11
 */
public class Demo5_JDBCRead
{
    public static void main(String[] args) {

        //设置表的运行环境
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();

        //不借助流进行操作，直接读取外部数据源，映射为表
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);

        /*
   
         */
        String createTableSql = " create table t1 ( id STRING, ts BIGINT , vc INT " +
            "    )WITH (" +
            "  'connector' = 'jdbc'," +
            "   'url' = 'jdbc:mysql://hadoop102:3306/221109'," +
            "   'table-name' = 'ws' ," +
            "   'username' = 'root' ," +
            "   'password' = '000000' " +
            ") ";

        //建表
        tableEnvironment.executeSql(createTableSql);

        //查询
        tableEnvironment.sqlQuery(" select * from t1  ").execute().print();

    }
}
